一个轻量级分布式 RPC 框架 下

一个轻量级分布式 RPC 框架 下

一个轻量级分布式 RPC 框架 上

3,编写客户端模块

设定Client端的实现功能接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.nia.rpc.core.client;

import com.nia.rpc.core.protocol.Response;

import java.lang.reflect.Method;

/**
* 服务的发现与使用
*
* Author 知秋
* Created by Auser on 2017/2/18.
*/
public interface Client {
Response sendMessage(Class<?> clazz, Method method, Object[] args);
<T> T proxyInterface(Class<T> serviceInterface);
void close();
}

实现 RPC 代理

因会用到动态代理,那先设计动态代理实现的

1
2
3
4
5
6
7
8
9
10
11
package com.nia.rpc.core.rpcproxy;

import com.nia.rpc.core.client.Client;

/**
* Author 知秋
* Created by Auser on 2017/2/18.
*/
public interface RpcProxy {
<T> T proxyInterface(Client client, final Class<T> serviceInterface);
}

这里使用CGLib 来实现 RPC 代理(当然也可以使用Java 提供的动态代理技术实现 ),具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.nia.rpc.core.rpcproxy;

import com.nia.rpc.core.client.Client;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;

import java.lang.reflect.Method;

/**
* Author 知秋
* Created by Auser on 2017/2/18.
*/
public class CglibRpcProxy implements RpcProxy {

@Override
public <T> T proxyInterface(Client client, Class<T> serviceInterface) {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(serviceInterface);
enhancer.setCallback(new CglibInteceptor(client, serviceInterface));
Object enhancedObject = enhancer.create();
return (T)enhancedObject;
}
/**
* 搞个静态内部类来做Method的cglib代理
*/
private static class CglibInteceptor implements MethodInterceptor {

//首先判断所要代理的方法是通用方法,是的话就此返回此代理对象的相关内容

private static Method hashCodeMethod;
private static Method equalsMethod;
private static Method toStringMethod;

static {
try {
hashCodeMethod = Object.class.getMethod("hashCode");
equalsMethod = Object.class.getMethod("equals", Object.class);
toStringMethod = Object.class.getMethod("toString");
} catch (NoSuchMethodException e) {
throw new NoSuchMethodError(e.getMessage());
}
}

/**
* 针对这几个方法做相应的策略
* @param proxy
* @return
*/
private int proxyHashCode(Object proxy) {
return System.identityHashCode(proxy);
}

private boolean proxyEquals(Object proxy, Object other) {
return (proxy == other);
}

private String proxyToString(Object proxy) {
return proxy.getClass().getName() + '@' + Integer.toHexString(proxy.hashCode());
}

/**
* 加入rpc客户端和传入所调用服务的接口
*/
private Client client;

private Class<?> serviceInterface;
public CglibInteceptor(Client client, Class<?> serviceInterface) {
this.client = client;
this.serviceInterface = serviceInterface;
}

@Override
public Object intercept(Object o, Method method, Object[] args, MethodProxy proxy) throws Throwable {
//先对方法进行判断是否是通用方法,假如都不是,最后再通过client来调用
if (hashCodeMethod.equals(method)) {
return proxyHashCode(proxy);
}
if (equalsMethod.equals(method)) {
return proxyEquals(proxy, args[0]);
}
if (toStringMethod.equals(method)) {
return proxyToString(proxy);
}
return client.sendMessage(serviceInterface, method, args).getResponse();
}
}
}

客户端handler设计

在服务端完成处理对服务端的返回信息进行封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.nia.rpc.core.utils;

import com.nia.rpc.core.protocol.Response;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Author 知秋
* Created by Auser on 2017/2/18.
*/
public class ResponseMapHelper {
public static ConcurrentMap<Long, BlockingQueue<Response>> responseMap = new ConcurrentHashMap<>();
}

然后再客户端对返回信息进行处理:

因为代码注释很详细,细节就不说了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.nia.rpc.core.client;

import com.nia.rpc.core.protocol.Response;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;

import static com.nia.rpc.core.utils.ResponseMapHelper.responseMap;

/**
* Author 知秋
* Created by Auser on 2017/2/18.
*/
@ChannelHandler.Sharable/*因为要在不同channel中共享使用responseMap的blockingQueue,所以要加此注解*/
public class RpcClientHandler extends SimpleChannelInboundHandler<Response> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClientHandler.class);

//因为此处这个要公用,故拿出来单独放到一个类中来调用
// public static ConcurrentMap<Long, BlockingQueue<Response>> responseMap = new ConcurrentHashMap<Long, BlockingQueue<Response>>();

@Override
protected void channelRead0(ChannelHandlerContext ctx, Response msg) throws Exception {
//此处的业务逻辑就是拿到对应id,讲返回信息放入相应blockingQueue中
BlockingQueue<Response> blockingQueue = responseMap.get(msg.getRequestId());
if (blockingQueue != null) {
blockingQueue.put(msg);
}

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("Exception caught on {}, ", ctx.channel(), cause);
ctx.channel().close();
}
}

客户端创建连接并保存

commons.pool2 对象池的使用

在客户端这里建立个对象池来保存并复用和提供服务端连接channel的信息,并在这里进行创建客户端连接然后保存,减少重复创建所带来的资源损耗

关于此对象池,想进一步了解,请看commons.pool2 对象池的使用,这里就不多说了

通过上个说明文档链接先搞个池对象工厂的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package com.nia.rpc.core.utils;

import com.nia.rpc.core.client.RpcClientHandler;
import com.nia.rpc.core.protocol.RpcDecoder;
import com.nia.rpc.core.protocol.RpcEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 池对象工厂(PooledObjectFactory接口):
* 用来创建池对象, 将不用的池对象进行钝化(passivateObject),
* 对要使用的池对象进行激活(activeObject),
* 对池对象进行验证(validateObject),
* 对有问题的池对象进行销毁(destroyObject)等工作
*
* Author 知秋
* Created by Auser on 2017/2/18.
*/
public class ConnectionObjectFactory extends BasePooledObjectFactory<Channel>{
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionObjectFactory.class);

private String ip;
private int port;

public ConnectionObjectFactory(String ip, int port) {
this.ip = ip;
this.port = port;
}

private Channel createNewConChannel() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new RpcDecoder(10 * 1024 * 1024))
.addLast(new RpcEncoder())
.addLast(new RpcClientHandler())
;
}
});
try {
final ChannelFuture f = bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.TCP_NODELAY, true)
.connect(ip, port).sync();
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
LOGGER.info("Connect success {} ", f);
}
});
final Channel channel = f.channel();
channel.closeFuture().addListener((ChannelFutureListener) future -> LOGGER.info("Channel Close {} {}", ip, port));
return channel;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}

@Override
public Channel create() throws Exception {
return createNewConChannel();
}

@Override
public PooledObject<Channel> wrap(Channel obj) {
//排查出错,之前直接返回个null,未对方法进行重写,导致出错,拿不出对象
return new DefaultPooledObject<>(obj);
}

@Override
public void destroyObject(PooledObject<Channel> p) throws Exception {
p.getObject().close().addListener((ChannelFutureListener) future -> LOGGER.info("Close Finish"));
}

@Override
public boolean validateObject(PooledObject<Channel> p) {
Channel object = p.getObject();
return object.isActive();
}
}

然后做个channel包装:

主要还是为了得到一个池GenericObjectPool:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.nia.rpc.core.client;

import com.nia.rpc.core.utils.ConnectionObjectFactory;
import io.netty.channel.Channel;
import lombok.Data;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;

/**
* Author 知秋
* Created by Auser on 2017/2/18.
*/
@Data
public class ChannelConf {
private String connStr;
private String host;
private int ip;
private Channel channel;
private ObjectPool<Channel> channelObjectPool;

public ChannelConf(String host, int port) {
this.host = host;
this.ip = port;
this.connStr = host + ":" + ip;
channelObjectPool = new GenericObjectPool<>(new ConnectionObjectFactory(host, port));
}

public void close() {
channelObjectPool.close();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ChannelConf{");
sb.append("connStr='").append(connStr).append('\'');
sb.append(", host='").append(host).append('\'');
sb.append(", ip=").append(ip);
sb.append(", channel=").append(channel);
sb.append(", channelObjectPool=").append(channelObjectPool);
sb.append('}');
return sb.toString();
}
}

对客户端接口逻辑进行实现:

在客户端启动时通过init()做如下逻辑:

  1. 通过CuratorFrameworkFactory来连接zk;
  2. 通过curator APIPath Cache用来监控一个ZNode的子节点. 这里是pathChildrenCache,然后开写监控逻辑(方便动态的增加删除channel):
    • 先在之前拿到zk下的所有节点,然后根据服务注册规则,所得到拼接服务的节点的key,就可以得到提供服务的所有子节点list,这里是newServiceData
    • 关闭删除本地缓存中多出的channel:
      • ​通过遍历channelWrappers来判断newServiceData是否包含有其中的链接,没包含的就清理掉,关闭相应对象池,并从channelWrappers移除;
    • 增加channelWrappers中未存在的服务连接channel
  3. 回到主线程,通过List<String> strings = children.forPath(serviceZKPath);得到提供服务的所有子节点list
  4. strings为空,则报出无可用服务的运行时异常处理;
  5. 不为空,添加进channelWrappers

在客户端调用接口方法时实现的逻辑:

  1. 对接口方法调用进行request包装;
  2. 根据相应服务来得到channel包装;
  3. 根据channel = channelWrapper.getChannelObjectPool().borrowObject();得到channel
  4. 将请求写入channel并刷出去channel.writeAndFlush(request);
  5. 然后:(看里面代码注释即可,已经很详细了)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

try {
channel.writeAndFlush(request);
//建立一个ResponseMap,将RequestId作为键,服务端回应的内容作为值保存于BlockingQueue,
// 最后一起保存在这个ResponseMap中
BlockingQueue<Response> blockingQueue = new ArrayBlockingQueue<>(1);
ResponseMapHelper.responseMap.put(request.getRequestId(), blockingQueue);
//poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

return blockingQueue.poll(requestTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RequestTimeoutException("service" + serviceName + " method " + method + " timeout");
} finally {
try {
//拿出去的channel记得还回去
channelWrapper.getChannelObjectPool().returnObject(channel);
} catch (Exception e) {
e.printStackTrace();
}
//删除此键值对,help GC
ResponseMapHelper.responseMap.remove(request.getRequestId());
}

客户端接口的动态代理逻辑

由上面实现的CglibRpcProxy通过newInstance()得到个实例,并调用其相关实现

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public <T> T proxyInterface(Class<T> serviceInterface) {
if (clientProxyClass == null) {
clientProxyClass = CglibRpcProxy.class;
}
try {
rpcProxy = clientProxyClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
return rpcProxy.proxyInterface(this, serviceInterface);
}

最后,客户端关闭逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void close() {
//注意要关三处地方,一个是先关闭zookeeper的连接,另一个是channel池对象,最后是netty的断开关闭
if (curatorFramework != null) {
curatorFramework.close();
}
try {
for (ChannelConf cw : channelWrappers) {
cw.close();
}
} finally {
eventLoopGroup.shutdownGracefully();
}
}

具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package com.nia.rpc.core.client;

import com.google.common.base.Splitter;
import com.nia.rpc.core.exception.RequestTimeoutException;
import com.nia.rpc.core.protocol.Request;
import com.nia.rpc.core.protocol.Response;
import com.nia.rpc.core.rpcproxy.CglibRpcProxy;
import com.nia.rpc.core.rpcproxy.RpcProxy;
import com.nia.rpc.core.utils.ResponseMapHelper;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.nia.rpc.core.utils.Constant.ZK_DATA_PATH;

/**
* Author 知秋
* Created by Auser on 2017/2/18.
*/
public class ClientImpl implements Client{
private static final Logger LOGGER = LoggerFactory.getLogger(ClientImpl.class);

private static AtomicLong atomicLong = new AtomicLong();
// 通过此发布的服务名称,来寻找对应的服务提供者
private String serviceName;
private int requestTimeoutMillis = 10 * 1000;
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);
private String zkConn;
private CuratorFramework curatorFramework;
private Class<? extends RpcProxy> clientProxyClass;
private RpcProxy rpcProxy;

// 存放ChannelConf到一个CopyOnWriteArrayList中,这个本就是读多写少的场景(服务注册后很少会发生状态改变),所以很适用
public static CopyOnWriteArrayList<ChannelConf> channelWrappers = new CopyOnWriteArrayList<>();

public ClientImpl(String serviceName) {
this.serviceName = serviceName;
}

public void init() {

curatorFramework = CuratorFrameworkFactory.newClient(getZkConn(), new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();


final GetChildrenBuilder children = curatorFramework.getChildren();
try {
final String serviceZKPath = ZK_DATA_PATH + serviceName;
//通过curator API 的Path Cache用来监控一个ZNode的子节点.
// 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态,
// 会包含最新的子节点, 子节点的数据和状态。
// 这也正如它的名字表示的那样, 那监控path。
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, serviceZKPath, true);
pathChildrenCache.start();

pathChildrenCache.getListenable().addListener((client, event) -> {
LOGGER.info("Listen Event {}", event);
//通过路径拿到此节点下可以提供服务的实现类节点连接地址
List<String> newServiceData = children.forPath(serviceZKPath);
LOGGER.info("Server {} list change {}", serviceName, newServiceData);

// 关闭删除本地缓存中多出的channel

for (ChannelConf cw : channelWrappers) {
String connStr = cw.getConnStr();
if (!newServiceData.contains(connStr)) {
cw.close();
LOGGER.info("Remove channel {}", connStr);
channelWrappers.remove(cw);
}
}

// 增加本地缓存中不存在的连接地址
for (String connStr : newServiceData) {
boolean containThis = false;
for (ChannelConf cw : channelWrappers) {
if (connStr != null && connStr.equals(cw.getConnStr())) {
containThis = true;
}
}
if (!containThis) {
addNewChannel(connStr);
}
}
});

List<String> strings = children.forPath(serviceZKPath);
if (CollectionUtils.isEmpty(strings)) {
throw new RuntimeException("No Service available for " + serviceName);
}

LOGGER.info("Found Server {} List {}", serviceName, strings);
for (String connStr : strings) {
try {
addNewChannel(connStr);
} catch (Exception e) {
LOGGER.error("Add New Channel Exception", e);
}
}

} catch (Exception e) {
e.printStackTrace();
}
}


private void addNewChannel(String connStr) {
try {
List<String> strings = Splitter.on(":").splitToList(connStr);
if (strings.size() != 2) {
throw new RuntimeException("Error connection str " + connStr);
}
String host = strings.get(0);
int port = Integer.parseInt(strings.get(1));
ChannelConf channelWrapper = new ChannelConf(host, port);
channelWrappers.add(channelWrapper);
LOGGER.info("Add New Channel {}, {}", connStr, channelWrapper);
} catch (Exception e) {
e.printStackTrace();
}
}


private ChannelConf selectChannel() {
Random random = new Random();
//同一个服务下有好几个链接地址的实现,那就选一个就是,其实为集群部署考虑,
// 每一台服务器部署有相同的服务,选择其一来处理即可,假如是nginx代理那就无所谓了
int size = channelWrappers.size();
if (size < 1) {
return null;
}
int i = random.nextInt(size);
return channelWrappers.get(i);
}

@Override
public Response sendMessage(Class<?> clazz, Method method, Object[] args) {

Request request = new Request();
request.setRequestId(atomicLong.incrementAndGet());
request.setMethod(method.getName());
request.setParams(args);
request.setClazz(clazz);
request.setParameterTypes(method.getParameterTypes());

ChannelConf channelWrapper = selectChannel();

if (channelWrapper == null) {
Response response = new Response();
RuntimeException runtimeException = new RuntimeException("Channel is not active now");
response.setThrowable(runtimeException);
return response;
}
//当channel的配置链接不为空的时候,就可以取到channel了
Channel channel = null;
try {
channel = channelWrapper.getChannelObjectPool().borrowObject();
} catch (Exception e) {
e.printStackTrace();
}
if (channel == null) {
Response response = new Response();
RuntimeException runtimeException = new RuntimeException("Channel is not available now");
response.setThrowable(runtimeException);
return response;
}


try {
channel.writeAndFlush(request);
//建立一个ResponseMap,将RequestId作为键,服务端回应的内容作为值保存于BlockingQueue,
// 最后一起保存在这个ResponseMap中
BlockingQueue<Response> blockingQueue = new ArrayBlockingQueue<>(1);
ResponseMapHelper.responseMap.put(request.getRequestId(), blockingQueue);
//poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

return blockingQueue.poll(requestTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//这个异常是自定义的,只是为了说明字面意思
throw new RequestTimeoutException("service" + serviceName + " method " + method + " timeout");
} finally {
try {
//拿出去的channel记得还回去
channelWrapper.getChannelObjectPool().returnObject(channel);
} catch (Exception e) {
e.printStackTrace();
}
//删除此键值对,help GC
ResponseMapHelper.responseMap.remove(request.getRequestId());
}
}

@Override
public <T> T proxyInterface(Class<T> serviceInterface) {
if (clientProxyClass == null) {
clientProxyClass = CglibRpcProxy.class;
}
try {
rpcProxy = clientProxyClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
return rpcProxy.proxyInterface(this, serviceInterface);
}

@Override
public void close() {
//注意要关三处地方,一个是先关闭zookeeper的连接,另一个是channel池对象,最后是netty的断开关闭
if (curatorFramework != null) {
curatorFramework.close();
}
try {
for (ChannelConf cw : channelWrappers) {
cw.close();
}
} finally {
eventLoopGroup.shutdownGracefully();
}
}

public String getZkConn() {
return zkConn;
}

public void setZkConn(String zkConn) {
this.zkConn = zkConn;
}

public int getRequestTimeoutMillis() {
return requestTimeoutMillis;
}

public void setRequestTimeoutMillis(int requestTimeoutMillis) {
this.requestTimeoutMillis = requestTimeoutMillis;
}
}

4,对外提供封装调用接口

在这里顺带教大家一个链式编程风格的设计,具体参考:Java里实现链式编程风格

服务端:ServerBuilder

代码太简单,就不解释了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.nia.rpc.core.bootstrap;

import com.google.common.base.Preconditions;
import com.nia.rpc.core.server.Server;
import com.nia.rpc.core.server.ServerImpl;

/**
* Author 知秋
* Created by Auser on 2017/2/19.
*/
public class ServerBuilder {
private int port;
private String serviceName;
private Object serviceImpl;
private String zkConn;

private ServerBuilder() {}

public static ServerBuilder builder() {
return new ServerBuilder();
}

public ServerBuilder port(int port) {
this.port = port;
return this;
}

public ServerBuilder serviceName(String serviceName) {
this.serviceName = serviceName;
return this;
}

public ServerBuilder serviceImpl(Object serviceImpl) {
this.serviceImpl = serviceImpl;
return this;
}

public ServerBuilder zkConn(String zkConn) {
this.zkConn = zkConn;
return this;
}

public Server build() {
Preconditions.checkNotNull(serviceImpl);
Preconditions.checkNotNull(serviceName);
Preconditions.checkNotNull(zkConn);
Preconditions.checkArgument(port > 0);
return new ServerImpl(this.port, this.serviceImpl, this.serviceName, this.zkConn);
}
}

客户端:ClientBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.nia.rpc.core.bootstrap;

import com.google.common.base.Preconditions;
import com.nia.rpc.core.client.ClientImpl;
import com.nia.rpc.core.rpcproxy.CglibRpcProxy;
import com.nia.rpc.core.rpcproxy.RpcProxy;

/**
* Author 知秋
* Created by Auser on 2017/2/19.
*/
public class ClientBuilder<T> {

private String serviceName;
private String zkConn;
private Class<T> serviceInterface;
private int requestTimeoutMillis = 10000;
private Class<? extends RpcProxy> clientProxyClass = CglibRpcProxy.class;

public static <T> ClientBuilder<T> builder() {
return new ClientBuilder<>();
}

public ClientBuilder<T> serviceName(String serviceName) {
this.serviceName = serviceName;
return this;
}

public ClientBuilder<T> zkConn(String zkConn) {
this.zkConn = zkConn;
return this;
}

public ClientBuilder<T> serviceInterface(Class<T> serviceInterface) {
this.serviceInterface = serviceInterface;
return this;
}

public ClientBuilder<T> requestTimeout(int requestTimeoutMillis) {
this.requestTimeoutMillis = requestTimeoutMillis;
return this;
}

public ClientBuilder<T> clientProxyClass(Class<? extends RpcProxy> clientProxyClass) {
this.clientProxyClass = clientProxyClass;
return this;
}

public T build() {
//因Curator底层依赖guava,刚好可以拿来验证
Preconditions.checkNotNull(serviceInterface);
Preconditions.checkNotNull(zkConn);
Preconditions.checkNotNull(serviceName);
ClientImpl client = new ClientImpl(this.serviceName);
client.setZkConn(this.zkConn);
client.setRequestTimeoutMillis(this.requestTimeoutMillis);
client.init();
return client.proxyInterface(this.serviceInterface);
}
}

至此,migo-RPC的核心模块完成

对核心模块的二次封装,提供更人性化的调用接口migo-rpc-provider模块

新建子项目,并导入pom依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>parent</artifactId>
<groupId>com.nia.rpc</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>migo-rpc-provider</artifactId>

<properties>
<spring.version>4.3.6.RELEASE</spring.version>
</properties>


<dependencies>
<dependency>
<groupId>com.nia.rpc</groupId>
<artifactId>migo-core</artifactId>
<version>${parent.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
<optional>true</optional>
</dependency>
</dependencies>
</project>

分别创建服务端和客户端的工厂bean:

通过下面代码可以很容易看到所完成的链式风格:

ServerFactoryBean:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.nia.rpc.factory;

import com.nia.rpc.core.bootstrap.ServerBuilder;
import com.nia.rpc.core.server.Server;
import com.nia.rpc.core.server.ServerImpl;
import lombok.Data;
import org.springframework.beans.factory.FactoryBean;

/**
* Author 知秋
* Created by Auser on 2017/2/19.
*/
@Data
public class ServerFactoryBean implements FactoryBean<Object>{

private Class<?> serviceInterface;
private Object serviceImpl;
private String ip;
private int port;
private String serviceName;
private String zkConn;
private ServerImpl rpcServer;

//服务注册并提供
public void start(){
Server rpcServer = ServerBuilder
.builder()
.serviceImpl(serviceImpl)
.serviceName(serviceName)
.zkConn(zkConn)
.port(port)
.build();
rpcServer.start();
}
//服务下线
public void serviceOffline(){
rpcServer.shutdown();
}
@Override
public Object getObject() throws Exception {
return this;
}

@Override
public Class<?> getObjectType() {
return this.getClass();
}

@Override
public boolean isSingleton() {
return true;
}
}
ClientFactoryBean:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.nia.rpc.factory;

import com.nia.rpc.core.bootstrap.ClientBuilder;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;

/**
* Author 知秋
* Created by Auser on 2017/2/19.
*/
@Data
public class ClientFactoryBean<T> implements FactoryBean<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientFactoryBean.class);

private Class<T> serviceInterface;
private String serviceName;
private String zkConn;

@Override
public T getObject() throws Exception {
return ClientBuilder
.<T>builder()
.zkConn(zkConn)
.serviceName(serviceName)
.serviceInterface(serviceInterface)
.build();
}

@Override
public Class<?> getObjectType() {
return serviceInterface;
}

@Override
public boolean isSingleton() {
return true;
}
}

示例使用

这里我们通过一个SpringbootDemo来演示如何使用:

具体代码结构请看源码

定义一个测试service接口:
1
2
3
4
5
6
7
8
9
10
11
12
package com.nia.rpc.example.service;

/**
* Author 知秋
* Created by Auser on 2017/2/19.
*/
public interface HelloWorld {
String say(String hello);

int sum(int a, int b);
int max(Integer a, Integer b);
}
编写其实现类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.nia.rpc.example.service;

/**
* Author 知秋
* Created by Auser on 2017/2/19.
*/
public class HelloWorldImpl implements HelloWorld {
@Override
public String say(String hello) {
return "server: "+hello;
}

@Override
public int sum(int a, int b) {
return a+b;
}

@Override
public int max(Integer a, Integer b) {
return a <= b ? b : a;
}
}
编写Springboot服务端启动类:
SpringServerConfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.nia.rpc.example.server;

import com.nia.rpc.core.utils.NetUtils;
import com.nia.rpc.example.service.HelloWorld;
import com.nia.rpc.example.service.HelloWorldImpl;
import com.nia.rpc.factory.ServerFactoryBean;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

/**
* Author 知秋
* Created by Auser on 2017/2/19.
*/
@SpringBootApplication
public class SpringServerConfig {
@Bean
public HelloWorld hello() {
return new HelloWorldImpl();
}

@Bean
public ServerFactoryBean serverFactoryBean() {
final ServerFactoryBean serverFactoryBean = new ServerFactoryBean();
serverFactoryBean.setPort(9090);
serverFactoryBean.setServiceInterface(HelloWorld.class);
//此处自定义的注册名字就相当于注解了,未来迭代的时候会加入自定义注解方式
serverFactoryBean.setServiceName("hello");
serverFactoryBean.setServiceImpl(hello());
serverFactoryBean.setZkConn("127.0.0.1:2181");

new Thread(() -> {
try {
serverFactoryBean.start();
} catch (Exception e) {
e.printStackTrace();
}
}, "RpcServer").start();
return serverFactoryBean;
}

public static void main(String[] args) {

SpringApplication.run(SpringServerConfig.class, "--server.port=8082");
}
}
编写服务调用端启动类:
SpringClientConfig:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.nia.rpc.example.client;

import com.nia.rpc.example.service.HelloWorld;
import com.nia.rpc.factory.ClientFactoryBean;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
* Author 知秋
* Created by Auser on 2017/2/19.
*/
@Configuration
@RestController
@SpringBootApplication
@RequestMapping("/test")
public class SpringClientConfig {
@Bean
public HelloWorld clientFactoryBean() throws Exception {
ClientFactoryBean<HelloWorld> clientFactoryBean = new ClientFactoryBean<>();
clientFactoryBean.setZkConn("127.0.0.1:2181");
clientFactoryBean.setServiceName("hello");
clientFactoryBean.setServiceInterface(HelloWorld.class);
return clientFactoryBean.getObject();
}
@Resource
private HelloWorld helloWorld;

@RequestMapping("/hello")
public String hello(String say) {
return helloWorld.say(say);
}
public static void main(String[] args) {
SpringApplication.run(SpringClientConfig.class, "--server.port=8081");
}
}

测试截图:

您的支持将鼓励我继续创作!